use std::thread;

use log::{error, info};
use scraper::Selector;
use serde::Deserialize;
use serde_json::{json, Value};
use serde_yaml::Mapping;
use ureq::Error;

use crate::datasource::record::Record;
use crate::datasource::DataSource;
use crate::endpoint::{EndPoint, Token};
use crate::summary;

const BATCH_SIZE: i32 = 50;

/// tasks configuration in indexer.yml [tasks]
#[derive(Deserialize, PartialEq, Debug)]
pub struct Task {
    pub datasource: String,
    pub index: String,
    pub table: Option<String>,
    #[serde(default)]
    pub primary: Option<String>,
    #[serde(default)]
    pub sql: Option<String>,
    #[serde(default)]
    pub scroll_by_id: bool,
    pub interval: Option<u64>,
    #[serde(default)]
    pub batch_size: u32,
    #[serde(default)]
    pub fields: Mapping,
}

impl Task {
    pub fn batch_size(&self) -> u32 {
        if self.batch_size > 0 {
            self.batch_size
        } else {
            1000
        }
    }
}

enum TaskFieldPreprocess {
    //split string to array
    Split,
    //string to json
    Json,
    //html strip to plain text
    HtmlStrip,
    Markdown, //markdown to text
}

impl std::fmt::Display for TaskFieldPreprocess {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            TaskFieldPreprocess::Split => write!(f, "split"),
            TaskFieldPreprocess::Json => write!(f, "json"),
            TaskFieldPreprocess::HtmlStrip => write!(f, "html_strip"),
            TaskFieldPreprocess::Markdown => write!(f, "markdown"),
        }
    }
}

impl Task {
    /// to test deeply
    pub fn test(&self) -> Result<bool, String> {
        Ok(true)
    }

    /**
     * 将整个对象中的字段值替换成配置中的映射值
     * @param row: &Value
     * @return Value
     */
    pub fn mapping_value(&self, row: &Value) -> Value {
        let mut new_object = row.clone();
        if let Some(obj) = row.as_object() {
            for (key, value) in obj.iter() {
                if value.is_i64() || value.is_u64() {
                    //数值处理
                    let new_value = self.mapping_cell_i_value(key, value.as_i64().unwrap());
                    //combine all keys in new_value to new_object
                    for (k, v) in new_value.as_object().unwrap().iter() {
                        new_object[k] = v.clone();
                    }
                } else if value.is_string() {
                    let s_value = value.as_str().unwrap().to_string();
                    //字符串处理
                    let new_value = self.mapping_cell_s_value(key, &s_value);
                    //combine all keys in new_value to new_object
                    for (k, v) in new_value.as_object().unwrap().iter() {
                        new_object[k] = v.clone();
                    }
                }
            }
        }
        new_object
    }

    /**
     * 获取配置中的数值字段映射
     * @param col_name: &String
     * @param col_value: &String
     * @return Value
     */
    pub fn mapping_cell_i_value(&self, col_name: &String, col_value: i64) -> Value {
        match self.fields.get(&col_name) {
            Some(serde_yaml::Value::Mapping(mapping)) => {
                //数值和字符串的映射
                for kv in mapping.iter() {
                    if let serde_yaml::Value::Number(n) = kv.0 {
                        if n.as_i64() == Some(col_value) {
                            return json!({col_name: kv.1});
                        }
                    }
                }
                //原样返回
                json!({
                    col_name: col_value
                })
            }
            _ => {
                json!({
                    col_name: col_value
                })
            }
        }
    }

    /**
     * 获取配置中的字符串字段映射
     * @param col_name: &String
     * @param col_value: &String
     * @return Value
     */
    pub fn mapping_cell_s_value(&self, col_name: &String, col_value: &String) -> Value {
        match self.fields.get(&col_name) {
            Some(serde_yaml::Value::Mapping(mapping)) => {
                //字符串处理，当前支持split
                if mapping.contains_key(TaskFieldPreprocess::Split.to_string()) {
                    let split_options = match mapping.get(TaskFieldPreprocess::Split.to_string()) {
                        Some(split) => (
                            split["new_field_name"].as_str().unwrap_or(col_name),
                            split["delimiter"].as_str().unwrap_or(","),
                            split["type"].as_str().unwrap_or("string"),
                        ),
                        None => (col_name.as_str(), ",", "string"),
                    };

                    //let delimiter: &str = split["delimiter"].as_str().unwrap_or(",");
                    //let elem_type: &str = split["type"].as_str().unwrap_or("string");
                    //let new_field_name: &str = split["new_field_name"].as_str().unwrap_or(col_name);

                    if split_options.2 == "long"
                        || split_options.2 == "integer"
                        || split_options.2 == "number"
                    {
                        //整数
                        let list: Vec<i64> = col_value
                            .split(split_options.1)
                            .map(|s| s.parse::<i64>().unwrap_or(0))
                            .collect();
                        return json!({
                            split_options.0: list
                        });
                    } else if split_options.2 == "float" || split_options.2 == "double" {
                        //小数
                        let list: Vec<f64> = col_value
                            .split(split_options.1)
                            .map(|s| s.parse::<f64>().unwrap_or(0.0))
                            .collect();
                        return json!({
                            split_options.0: list
                        });
                    } else {
                        //字符串
                        let list: Vec<&str> =
                            col_value.split(split_options.1).map(|s| s.trim()).collect();
                        return json!({
                            split_options.0: list
                        });
                    }
                } else if mapping.contains_key(TaskFieldPreprocess::Json.to_string()) {
                    match serde_json::from_str::<Value>(col_value) {
                        Ok(json_value) => {
                            return json!({
                                col_name: json_value
                            });
                        }
                        Err(e) => {
                            error!("failed to parse json string {:?}, reason: {:?}", col_value, e);
                        }
                    }
                } else if mapping.contains_key(TaskFieldPreprocess::HtmlStrip.to_string()) {
                    //get mapping ,if is empty return new Mapping
                    let html_strip_options =
                        match mapping.get(TaskFieldPreprocess::HtmlStrip.to_string()) {
                            Some(html_strip) => (
                                html_strip["new_field_name"].as_str().unwrap_or(col_name), //new field name
                                html_strip["extract_images_to_field"].as_str().unwrap_or(""), //images field name
                                html_strip["extract_summary_to_field"].as_str().unwrap_or(""), //summary field name
                                html_strip["summary_sentences"].as_i64().unwrap_or(2), //summary sentences
                            ),
                            None => (col_name.as_str(), "", "", 0),
                        };
                    let html = scraper::Html::parse_fragment(col_value);
                    let text = html.root_element().text().collect::<Vec<_>>().join("");
                    let mut json = json!({
                        html_strip_options.0: text
                    });
                    if html_strip_options.1.len() > 0 {
                        //extract images
                        let mut images = Vec::new();
                        let selector = Selector::parse("img").unwrap();
                        for img in html.select(&selector) {
                            if let Some(src) = img.value().attr("src") {
                                images.push(src);
                            }
                        }
                        json[html_strip_options.1] = json!(images);
                    }
                    if html_strip_options.2.len() > 0 {
                        //extract summary
                        let summary = summary::summarize(
                            text.as_str(),
                            &["\n", "\r", "。", "！", "？"],
                            html_strip_options.3 as usize,
                        );
                        json[html_strip_options.2] = json!(summary);
                    }
                    return json;
                } else if mapping.contains_key(TaskFieldPreprocess::Markdown.to_string()) {
                    let markdown_options =
                        match mapping.get(TaskFieldPreprocess::Markdown.to_string()) {
                            Some(html_strip) => (
                                html_strip["new_field_name"].as_str().unwrap_or(col_name), //new field name
                                html_strip["extract_images_to_field"].as_str().unwrap_or(""), //images field name
                                html_strip["extract_summary_to_field"].as_str().unwrap_or(""), //summary field name
                                html_strip["summary_sentences"].as_i64().unwrap_or(2), //summary sentences
                            ),
                            None => (col_name.as_str(), "", "", 0),
                        };
                    let md = pulldown_cmark::Parser::new(col_value);
                    let mut html = String::new();
                    pulldown_cmark::html::push_html(&mut html, md);
                    let html_elem = scraper::Html::parse_fragment(html.as_str());
                    let text = html_elem.root_element().text().collect::<Vec<_>>().join("");

                    let mut json = json!({
                        markdown_options.0: text
                    });
                    if markdown_options.1.len() > 0 {
                        //extract images
                        let mut images = Vec::new();
                        let selector = Selector::parse("img").unwrap();
                        let html = scraper::Html::parse_fragment(&html);
                        for img in html.select(&selector) {
                            if let Some(src) = img.value().attr("src") {
                                images.push(src);
                            }
                        }
                        json[markdown_options.1] = json!(images);
                    }
                    if markdown_options.2.len() > 0 {
                        //extract summary
                        let summary = summary::summarize(
                            text.as_str(),
                            &["\n", "\r", "。", "！", "？"],
                            markdown_options.3 as usize,
                        );
                        json[markdown_options.2] = json!(summary);
                    }
                    return json;
                }
                //原样返回
                json!({
                    col_name: col_value
                })
            }
            Some(serde_yaml::Value::String(str)) => {
                //字段名转换
                let new_col_name = str.to_string();
                if new_col_name.len() == 0 {
                    // skip the empty column name
                    return Value::Null;
                }
                json!({
                    new_col_name: col_value
                })
            }
            _ => {
                json!({
                    col_name: col_value
                })
            }
        }
    }

    /// start thread to handle increment update
    pub fn start(&self, name: &String, source: &DataSource, endp: &EndPoint, verbose: bool) {
        if let Ok(ids) = source.inst() {
            let mut token = Token::new(); //indexea token
            loop {
                // read tasks from 'indexea_tasks'
                let mut count = 0;
                match ids.tasks(name, &self, BATCH_SIZE) {
                    Err(e) => {
                        error!("failed to load todo tasks for {}, reason: {}", name, e)
                    }
                    Ok(mut records) => {
                        count = records.len() as i32;
                        if verbose {
                            info!("{} tasks of [{}] to process", count, name);
                        }
                        if count > 0 {
                            for rec in &mut records {
                                rec.index = self.index.clone();
                            }

                            let mut is_token_ready = true;
                            // refresh when need
                            if token.is_expired() {
                                match endp.token() {
                                    Ok(t) => token = t,
                                    Err(Error::Status(code, resp)) => {
                                        is_token_ready = false;
                                        if let Ok(reason) = resp.into_string() {
                                            error!("failed to request access token with code {}, reason: {:?}", code, reason);
                                        } else {
                                            error!(
                                                "failed to request access token with code {}",
                                                code
                                            );
                                        }
                                    }
                                    Err(Error::Transport(t)) => {
                                        is_token_ready = false;
                                        if let Some(msg) = t.message() {
                                            error!(
                                                "failed to request access token, reason: {}",
                                                msg
                                            );
                                        }
                                    }
                                }
                            }
                            if is_token_ready {
                                let mut mapping_records = records
                                    .iter()
                                    .map(|r| Record {
                                        value: self.mapping_value(&r.value),
                                        index: r.index.clone(),
                                        task: r.task.clone(),
                                    })
                                    .collect();
                                // push to endpoint
                                let results = endp.push(&token, &mut mapping_records);
                                //write back to datasource
                                if let Err(e) = ids.finish(results) {
                                    error!("failed flush tasks status, reason: {}", e)
                                }
                            }
                        }
                    }
                }
                if count < BATCH_SIZE {
                    thread::sleep(std::time::Duration::from_millis(self.interval.unwrap()));
                }
            }
        }
    }
}

impl Clone for Task {
    fn clone(&self) -> Task {
        Task {
            datasource: self.datasource.clone(),
            index: self.index.clone(),
            table: self.table.clone(),
            primary: self.primary.clone(),
            sql: self.sql.clone(),
            interval: self.interval.clone(),
            batch_size: self.batch_size,
            fields: self.fields.clone(),
            scroll_by_id: self.scroll_by_id,
        }
    }
}
